-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19263] DAGScheduler should avoid sending conflicting task set. #16620
Conversation
Thanks for the work thus far, @jinxing64 , but this really needs updated test coverage before we can consider merging it. |
ok to test |
Beyond the lack of new tests, this patch is causing a couple of existing DAGSchedulerSuite tests to fail for me locally: "run trivial shuffle with out-of-band failure and retry" and "map stage submission with executor failure late map task completions" |
Thanks for pointing out this issue, and the nice description. Still looking but sounds like a legitimate issue. I think |
Jenkins, test this please |
Test build #3540 has finished for PR 16620 at commit
|
@squito |
d3b6ebb
to
b20d316
Compare
@markhamstra @squito |
taskScheduler.rootPool.getSortedTaskSetQueue.exists { | ||
tsm => tsm.stageId == stageId && !tsm.isZombie | ||
} | ||
} else false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The if...else
is unnecessary:
val activeTaskSetManagerExist =
taskScheduler.rootPool != null &&
taskScheduler.rootPool.getSortedTaskSetQueue.exists { tsm =>
tsm => tsm.stageId == stageId && !tsm.isZombie
}
@@ -1193,7 +1193,15 @@ class DAGScheduler( | |||
} | |||
|
|||
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { | |||
markStageAsFinished(shuffleStage) | |||
val activeTaskSetManagerExist = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should be activeTaskSetManagerExists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And since it is being used as !activeTaskSetManagerExists
, you could reverse the sense, avoid needing the !
, and call it something like noActiveTaskSetManager
.
b20d316
to
be8bfe5
Compare
@markhamstra |
Jenkins, ok to test |
Test build #71874 has finished for PR 16620 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jinxing64 for working on this. I'm sorry that at the moment my comments are mostly critical, without providing very constructive advice. I'll keep thinking about this but I thought I'd share my feedback now.
This is a really important fix, and the work you are doing on it is great -- but also tricky enough I want to try to put in a change which improves the clarity of the code and we feel confident in.
@@ -1218,7 +1225,9 @@ class DAGScheduler( | |||
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + | |||
") because some of its tasks had failed: " + | |||
shuffleStage.findMissingPartitions().mkString(", ")) | |||
submitStage(shuffleStage) | |||
if (noActiveTaskSetManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this condition go into the surrounding if (!shuffleStage.isAvailable)
? the logInfo is very confusing in this case otherwise.
case (1, 1) => | ||
// Wait long enough until Success of task(stageAttempt=1 and partition=0) | ||
// is handled by DAGScheduler. | ||
Thread.sleep(5000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this is a nuisance. I don't see any good way to get rid of this sleep ... but now that I think about it, why can't you do this in DAGSchedulerSuite
? it seems like this can be entirely contained to the DAGScheduler
and doesn't require tricky interactions with other parts of the scheduler. (I'm sorry I pointed you in the wrong direction earlier -- I thought perhaps you had tried to copy the examples of DAGSchedlerSuite
but there was some reason you couldn't.)
assert(results === (0 until 2).map { _ -> 10}.toMap) | ||
} | ||
|
||
def waitUntilConditionBecomeTrue(condition: => Boolean, timeout: Long, msg: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to waitForCondition
(maybe irrevlant given other comments)
} | ||
if (shuffleStage.isAvailable || noActiveTaskSetManager) { | ||
markStageAsFinished(shuffleStage) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to admit, though this passes all the tests, this is really confusing to me. I only somewhat understand why your original version didn't work, and why this should be used instead. Perhaps some more commenting here would help? The condition under which you do markStageAsFinished
seems very broad, so perhaps its worth a comment on the case when you do not (and perhaps even a logInfo
in an else
branch). The discrepancy between pendingPartitions and availableOutputs is also surprising -- perhaps that is worth extra comments on Stage
, on how the meaning of those two are different.
217aa44
to
3f0ebb8
Compare
@squito In current When handle
|
@squito I don't like the |
Test build #71979 has finished for PR 16620 at commit
|
Fail to pass unit test. I will keep working on this. |
Test build #72023 has finished for PR 16620 at commit
|
s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + | ||
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") | ||
sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Success
is handled in DAGScheduler
in a different thread. DAGScheduler
perhaps needs to check tasksetManager's
status, e.g. isZombie
. Move the code here, thus the checking status of TaskSetManager
in DAGScheduler
when handle Success
is safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this be moved before maybeFinishTaskSet()
, if you only need isZombie=true
? for performance its helpful to hand off to the dagscheduler thread as soon as we can. Probably not a huge impact but we should try to avoid impacting performance where possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito
Yes, it makes sense to move this part before maybeFinishTaskSet()
, I will refine.
Test build #72028 has finished for PR 16620 at commit
|
@squito |
@squito |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great -- I just left some nits on improving test commenting.
@@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou | |||
} | |||
} | |||
|
|||
test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," + | |||
" even with late completions from earlier stage attempts") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent two more spaces
taskSets(1).tasks(0), | ||
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, | ||
"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), | ||
null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you pass null as a named parameter here? ("parameterName = null")
(Success, makeMapStatus("hostB", 2)), | ||
(Success, makeMapStatus("hostB", 2)))) | ||
|
||
// Task succeeds on a failed executor. The success is bogus. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you change the 2nd sentence to "The success should be ignored because the task started before the executor failed, so the output may have been lost."
runEvent(makeCompletionEvent( | ||
taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) | ||
|
||
assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the second part have taskSets(3) instead of taskSets(2)?
|
||
submit(rddC, Array(0, 1)) | ||
|
||
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you comment this like I suggested in your other PR?
runEvent(makeCompletionEvent( | ||
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) | ||
|
||
// There should be no new attempt of stage submitted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add "because task 1 is still running in the current attempt (and hasn't completed successfully in any earlier attempts)."
taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) | ||
|
||
// There should be no new attempt of stage submitted. | ||
assert(taskSets.size === 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the line that would fail without your change? (just verifying my understanding)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so : )
runEvent(makeCompletionEvent( | ||
taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) | ||
|
||
// ResultStage submitted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Now the ResultStage should be submitted, because all of the tasks to generate rddB have completed successfully on alive executors."
Test build #72849 has finished for PR 16620 at commit
|
@kayousterhout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the careful analysis and writeup Kay. this version makes sense to me.
// completed successfully from the perspective of the TaskSetManager, mark it as | ||
// no longer pending (the TaskSetManager may consider the task complete even | ||
// when the output needs to be ignored because the task's epoch is too small below). | ||
shuffleStage.pendingPartitions -= task.partitionId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its worth also explaining how this inconsistency between pendingPartitions and outputLocations gets resolved. IIUC, its that when the pendingPartitions is empty, the scheduler will check outputLocations, realize something is missing, and resubmit this stage.
@squito |
Test build #72912 has finished for PR 16620 at commit
|
Test build #72913 has finished for PR 16620 at commit
|
The pendingPartitions instance variable should be moved to ShuffleMapStage, because it is only used by ShuffleMapStages. This change is purely refactoring and does not change functionality. I fixed this in an attempt to clarify some of the discussion around apache#16620, which I was having trouble reasoning about. I stole the helpful comment Imran wrote for pendingPartitions and used it here. cc squito markhamstra jinxing64 Author: Kay Ousterhout <[email protected]> Closes apache#16876 from kayousterhout/SPARK-19537.
// when the output needs to be ignored because the task's epoch is too small below, | ||
// if so, this can result in inconsistency between pending partitions and output | ||
// locations of stage. When pending partitions is empty, the scheduler will check | ||
// output locations, if there is missing, the stage will be resubmitted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more proposal to improve this comment:
...epoch is too small below. In this case, when pending partitions is empty, there will still be missing output locations, which will cause the DAGScheduler to resubmit the stage below.)
LGTM pending one last comment improvement |
Yes, refined : ) |
Test build #72974 has finished for PR 16620 at commit
|
LGTM! Thanks for finding this subtle bug and all of the hard work to fix it @jinxing64. I'll wait until tomorrow to merge this to give Mark and Imran a chance for any last comments. |
@kayousterhout @squito @markhamstra |
LGTM |
Thanks all for the work on this! I've merged this into master. |
This commit improves the tests that check the case when a ShuffleMapTask completes successfully on an executor that has failed. This commit improves the commenting around the existing test for this, and adds some additional checks to make it more clear what went wrong if the tests fail (the fact that these tests are hard to understand came up in the context of markhamstra's proposed fix for #16620). This commit also removes a test that I realized tested exactly the same functionality. markhamstra, I verified that the new version of the test still fails (and in a more helpful way) for your proposed change for #16620. Author: Kay Ousterhout <[email protected]> Closes #16892 from kayousterhout/SPARK-19560.
This commit improves the tests that check the case when a ShuffleMapTask completes successfully on an executor that has failed. This commit improves the commenting around the existing test for this, and adds some additional checks to make it more clear what went wrong if the tests fail (the fact that these tests are hard to understand came up in the context of markhamstra's proposed fix for apache#16620). This commit also removes a test that I realized tested exactly the same functionality. markhamstra, I verified that the new version of the test still fails (and in a more helpful way) for your proposed change for apache#16620. Author: Kay Ousterhout <[email protected]> Closes apache#16892 from kayousterhout/SPARK-19560.
The pendingPartitions instance variable should be moved to ShuffleMapStage, because it is only used by ShuffleMapStages. This change is purely refactoring and does not change functionality. I fixed this in an attempt to clarify some of the discussion around apache#16620, which I was having trouble reasoning about. I stole the helpful comment Imran wrote for pendingPartitions and used it here. cc squito markhamstra jinxing64 Author: Kay Ousterhout <[email protected]> Closes apache#16876 from kayousterhout/SPARK-19537.
In current `DAGScheduler handleTaskCompletion` code, when event.reason is `Success`, it will first do `stage.pendingPartitions -= task.partitionId`, which maybe a bug when `FetchFailed` happens. **Think about below** 1. Stage 0 runs and generates shuffle output data. 2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA. 3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The driver marks executorA as lost and updates failedEpoch; 4. The driver resubmits stage 0 so the missing output can be re-generated, and then once it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x. 5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to the set of output locations (line 1192), because the task’s epoch is less than the failure epoch for the executor (because of the earlier failure on executor A) 6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition 1 from stage.pendingPartitions. Combined with the previous step, this means that there are no more pending partitions for the stage, so the DAGScheduler marks the stage as finished (line 1196). However, the shuffle stage is not available (line 1215) because the completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage. 7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for the re-submitted stage, it throws an error, because there’s an existing active task set **In this fix** If a task completion is from a previous stage attempt and the epoch is too low (i.e., it was from a failed executor), don't remove the corresponding partition from pendingPartitions. Author: jinxing <[email protected]> Author: jinxing <[email protected]> Closes apache#16620 from jinxing64/SPARK-19263.
What changes were proposed in this pull request?
In current
DAGScheduler handleTaskCompletion
code, when event.reason isSuccess
, it will first dostage.pendingPartitions -= task.partitionId
, which maybe a bug whenFetchFailed
happens.Think about below
In this fix
Add a check if there is already active(not zombie) taskSetManager before resubmission.
How was this patch tested?
Added a unit test in.